Skip to content

shard flush buffer to multi core#387

Open
lokax wants to merge 6 commits intomainfrom
yf-pro-ckpt
Open

shard flush buffer to multi core#387
lokax wants to merge 6 commits intomainfrom
yf-pro-ckpt

Conversation

@lokax
Copy link
Collaborator

@lokax lokax commented Jan 30, 2026

Here are some reminders before you submit the pull request

  • Add tests for the change
  • Document changes
  • Reference the link of issue using fixes eloqdb/tx_service#issue_id
  • Reference the link of RFC if exists
  • Pass ./mtr --suite=mono_main,mono_multi,mono_basic

Summary by CodeRabbit

  • Performance Improvements

    • Optimized batch write operations with pre-allocated buffers, reducing dynamic memory growth.
    • Enhanced memory efficiency through precomputed size calculations for bulk operations.
    • Improved worker resource utilization with per-worker flush architecture and task distribution.
  • Style

    • Minor formatting and whitespace adjustments.

Copilot AI review requested due to automatic review settings January 30, 2026 04:11
@coderabbitai
Copy link

coderabbitai bot commented Jan 30, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review

Walkthrough

The PR refactors transaction service components to support per-worker flush handling and introduces centralized data-key accounting. Changes span memory optimization in data store operations, restructured flush buffer management from single global to per-worker arrays, updated API signatures, and new counter-based accounting for data and dirty keys.

Changes

Cohort / File(s) Summary
Data Store Optimization
store_handler/data_store_service_client.cpp, store_handler/eloq_data_store_service/eloq_store_data_store.cpp
Whitespace adjustment in PutAll; BuildKey/BuildValue precomputation and capacity reservation; BatchWriteRecords refactored to use preallocated vectors with offset tracking instead of dynamic growth; TTL handling updated to zero expire_ts_ when TTL equals UINT64_MAX.
CC Request Restructuring
tx_service/include/cc/cc_request.h
Removed ScanBatchSize constant and per-key timing/pause fields (last_datasync_ts, scan_ts, pause_key) from ScanDeltaSizeCcForHashPartition; introduced per-cc map counters data_key_count_ and dirty_key_count_; updated memory estimation to use key count metrics; simplified constructor signature.
Per-Worker Flush Architecture
tx_service/include/cc/local_cc_shards.h
Replaced single cur_flush_buffer_ with per-worker cur_flush_buffers_ vector; replaced single pending_flush_work_ deque with per-worker deque vector; added DataSyncWorkerToFlushDataWorker() mapping method; updated FlushDataWorker() and FlushData() signatures to accept worker_idx parameter.
Data-Key Accounting Infrastructure
tx_service/include/cc/template_cc_map.h
Added AdjustDataKeyStats() method for centralized footprint accounting; introduced data_key_count_ and dirty_data_key_count_ members; replaced scattered stat updates across entry lifecycle (insertion, deletion, flush, commit) with coordinated accounting calls; propagates counters via SetKeyCounts() to requests.
Per-Worker Flush Implementation
tx_service/src/cc/local_cc_shards.cpp
Major refactor of StartBackgroudWorkers() to initialize per-worker buffers and launch per-worker threads; DataSyncForHashPartition reworked for per-core partitioning and per-worker buffer routing; AddFlushTaskEntry updated to dispatch tasks to per-worker queues using mapping logic; FlushCurrentFlushBuffer and related functions restructured for per-worker queue management; data-path and error handling aligned with per-worker context.

Sequence Diagram(s)

sequenceDiagram
    participant DS as DataSync Worker
    participant Router as DataSyncWorkerToFlushDataWorker
    participant FQ as Per-Worker Flush Queue
    participant FW as FlushData Worker
    participant FT as FlushDataTask
    participant IO as I/O Layer

    DS->>Router: Map worker_id to target flush_worker
    Router-->>DS: Return target_worker_idx
    DS->>FQ: Route FlushTaskEntry to pending_flush_work_[target_idx]
    FQ->>FQ: Enqueue task to per-worker queue
    FW->>FW: FlushDataWorker(worker_idx) thread active
    FW->>FQ: Dequeue from pending_flush_work_[worker_idx]
    FQ-->>FW: FlushDataTask
    FW->>FT: Process FlushDataTask
    FT->>IO: Execute flush I/O operations
    IO-->>FT: Completion
    FT->>FW: Return status
    FW->>FW: Update per-worker memory quotas
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~50 minutes

Possibly related PRs

Suggested reviewers

  • liunyl
  • thweetkomputer

🐰 Per-worker buffers now align,
Data-key accounting locked in,
No more global flush clashing—
Workers dance, queues thrashing,
Memory optimized with a grin!

🚥 Pre-merge checks | ✅ 1 | ❌ 2

❌ Failed checks (2 warnings)

Check name Status Explanation Resolution
Description check ⚠️ Warning The PR description consists only of an unchecked template checklist with no actual implementation details, context, rationale, or completed items provided. Fill in the PR description with actual implementation details, explain the motivation for the change, document any breaking API changes, and clarify which checklist items were completed.
Docstring Coverage ⚠️ Warning Docstring coverage is 8.70% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (1 passed)
Check name Status Explanation
Title check ✅ Passed The title 'shard flush buffer to multi core' directly describes the main architectural change: refactoring the single shard flush buffer to support multi-core/per-worker flushing.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch yf-pro-ckpt

Tip

Try Coding Plans. Let us write the prompt for your AI agent so you can ship faster (with fewer bugs).
Share your feedback on Discord.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR implements per-worker flush buffers to enable multi-core parallel flushing during data synchronization. The main goal is to improve performance by sharding the flush buffer across multiple cores instead of using a single shared buffer.

Changes:

  • Replaced single shared cur_flush_buffer_ with per-worker cur_flush_buffers_ vector
  • Modified flush workers to process their dedicated buffers and queues
  • Simplified delta size scanning by using per-map statistics instead of iterative scanning
  • Added partition-based routing to assign flush tasks to specific workers

Reviewed changes

Copilot reviewed 7 out of 7 changed files in this pull request and generated 7 comments.

Show a summary per file
File Description
tx_service/src/cc/local_cc_shards.cpp Core implementation of per-worker flush buffers, partition-based task routing, and simplified delta size calculation
tx_service/include/cc/local_cc_shards.h Updated class members to use vectors for per-worker buffers and queues
tx_service/include/cc/template_cc_map.h Added per-map key count tracking and removed iterative delta size scanning logic
tx_service/include/cc/cc_request.h Simplified ScanDeltaSizeCcForHashPartition to use statistics instead of scanning
store_handler/eloq_data_store_service/eloqstore Updated subproject commit reference
store_handler/eloq_data_store_service/eloq_store_data_store.cpp Minor whitespace formatting
store_handler/data_store_service_client.cpp Minor whitespace formatting

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

static_cast<uint64_t>(MB(conf.at("node_memory_limit_mb")) *
(FLAGS_ckpt_buffer_ratio - 0.025))),
flush_data_worker_ctx_(conf.at("core_num")),
// cur_flush_buffers_ will be resized in StartBackgroudWorkers()
Copy link

Copilot AI Jan 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Corrected spelling of 'Backgroud' to 'Background'.

Suggested change
// cur_flush_buffers_ will be resized in StartBackgroudWorkers()
// cur_flush_buffers_ will be resized in StartBackgroundWorkers()

Copilot uses AI. Check for mistakes.
Comment on lines +4873 to +4875
return (hash_code & 0x3FF) >= min_partition_id_this_scan &&
(hash_code & 0x3FF) <= max_partition_id_this_scan &&
(!filter_func || filter_func(hash_code));
Copy link

Copilot AI Jan 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The magic number 0x3FF (1023) is used without explanation. This masks the lower 10 bits to extract a partition ID in the range [0, 1023], which matches the partition_number constant defined at line 4815. Consider defining this as a named constant (e.g., PARTITION_MASK or HASH_PARTITION_MASK) to improve code clarity and maintainability.

Copilot uses AI. Check for mistakes.
{
// Merge successful, task was merged into last_task
flush_data_worker_ctx_.cv_.notify_one();
flush_data_worker_ctx_.cv_.notify_all();
Copy link

Copilot AI Jan 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed from notify_one() to notify_all(). With per-worker queues where each worker only processes its own queue, notify_all() wakes all workers even though only one worker's queue has new work. Consider using targeted notification or reverting to notify_one() if workers can determine which queue has work, as notify_all() may cause unnecessary wake-ups and context switches.

Copilot uses AI. Check for mistakes.
// Could not merge, wait if queue is full
while (pending_flush_work_.size() >=
static_cast<size_t>(flush_data_worker_ctx_.worker_num_))
while (pending_flush_work.size() >= 2)
Copy link

Copilot AI Jan 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The hard-coded limit of 2 pending tasks per worker queue replaces the previous dynamic limit based on total worker count. Consider defining this as a named constant (e.g., MAX_PENDING_TASKS_PER_WORKER) to make the limit configurable and improve code maintainability.

Copilot uses AI. Check for mistakes.
pending_flush_work_.emplace_back(std::move(flush_data_task));
flush_data_worker_ctx_.cv_.notify_one();
pending_flush_work.emplace_back(std::move(flush_data_task));
flush_data_worker_ctx_.cv_.notify_all();
Copy link

Copilot AI Jan 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed from notify_one() to notify_all(). With per-worker queues, using notify_all() here wakes all workers unnecessarily when only one worker's queue received work. This could lead to spurious wake-ups and reduced performance under high concurrency.

Copilot uses AI. Check for mistakes.

// Access ScanSliceDeltaSizeCcForRangePartition member variable is
// unsafe after SetFinished().
req.SetKeyCounts(data_key_count_, dirty_data_key_count_);
Copy link

Copilot AI Jan 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method reads data_key_count_ and dirty_data_key_count_ without synchronization. If these fields are modified concurrently by other threads (e.g., via AdjustDataKeyStats), this could result in reading inconsistent or torn values. Consider adding appropriate synchronization or using atomic types if concurrent access is possible.

Copilot uses AI. Check for mistakes.
Comment on lines +8533 to +8550
void AdjustDataKeyStats(int64_t size_delta, int64_t dirty_delta)
{
if (size_delta != 0)
{
assert(size_delta >= 0 ||
data_key_count_ >= static_cast<size_t>(-size_delta));
data_key_count_ = static_cast<size_t>(
static_cast<int64_t>(data_key_count_) + size_delta);
}

if (dirty_delta != 0)
{
assert(dirty_delta >= 0 ||
dirty_data_key_count_ >= static_cast<size_t>(-dirty_delta));
dirty_data_key_count_ = static_cast<size_t>(
static_cast<int64_t>(dirty_data_key_count_) + dirty_delta);
}
}
Copy link

Copilot AI Jan 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method modifies data_key_count_ and dirty_data_key_count_ without synchronization. If this method is called concurrently from multiple threads, race conditions could occur leading to incorrect counts. Consider adding synchronization (e.g., mutex or atomic operations) to ensure thread-safe updates.

Copilot uses AI. Check for mistakes.
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Fix all issues with AI agents
In `@store_handler/eloq_data_store_service/eloqstore`:
- Line 1: The repository's submodule declared in .gitmodules is not initialized
in CI causing an empty directory; update the CI pipeline (e.g., the
checkout/build job) to initialize submodules by running git submodule update
--init --recursive (or enable submodules in your checkout action/step) before
the build step so the eloqstore submodule is populated for the build.

In `@tx_service/include/cc/template_cc_map.h`:
- Around line 8533-8550: AdjustDataKeyStats currently relies on asserts to
prevent underflow but asserts are omitted in release builds; replace the
assert-only checks with runtime guards that perform saturating arithmetic: for
each of size_delta and dirty_delta, compute with int64_t (e.g., curr =
static_cast<int64_t>(data_key_count_)) and if delta is negative and -delta >
curr then clamp the result to 0 (or handle as an error/return), otherwise add
the delta and assign back via static_cast<size_t>. Apply the same pattern to
dirty_data_key_count_ in AdjustDataKeyStats to avoid size_t wrap-around in
release builds.
🧹 Nitpick comments (1)
tx_service/include/cc/cc_request.h (1)

9105-9113: Prevent overflow in UpdatedMemory estimation.
dirty_key_count_ * memory_usage_ can overflow on large tables and silently under-estimate memory. Consider widening the intermediate and clamping.

Suggested fix
-        return static_cast<size_t>(
-            (dirty_key_count_ * memory_usage_ + data_key_count_ - 1) /
-            data_key_count_);
+        const __uint128_t numerator =
+            static_cast<__uint128_t>(dirty_key_count_) * memory_usage_ +
+            (data_key_count_ - 1);
+        const __uint128_t estimate = numerator / data_key_count_;
+        return estimate > std::numeric_limits<size_t>::max()
+                   ? std::numeric_limits<size_t>::max()
+                   : static_cast<size_t>(estimate);

If <limits> isn’t already included, add it.

@@ -1 +1 @@
Subproject commit ca395ee54d6fe039bd3b2870d4e9a5d106f512fc
Subproject commit d687e1249a54722faef46e584169329d9676220b
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Check submodule registration and checkout status
echo "=== Submodule status ==="
git submodule status --recursive 2>&1 || echo "No submodules or error"

echo -e "\n=== .gitmodules configuration ==="
git config --file .gitmodules --get-regexp path 2>&1 || echo "No .gitmodules or no paths found"

echo -e "\n=== Path existence check ==="
echo "store_handler/ directory:"
ls -la store_handler/ 2>&1 | head -20

echo -e "\neloq_data_store_service/ directory:"
ls -la store_handler/eloq_data_store_service/ 2>&1 | head -20

echo -e "\neloqstore/ directory:"
ls -la store_handler/eloq_data_store_service/eloqstore 2>&1 || echo "Path does not exist"

echo -e "\n=== Git modules configuration ==="
cat .gitmodules 2>&1 || echo "No .gitmodules file"

Repository: eloqdata/tx_service

Length of output: 4128


Submodule not initialized — CI must run git submodule update --init --recursive.

The submodule is registered in .gitmodules but not initialized in the repository checkout. The directory exists but is empty, which is why the build fails. Ensure CI configuration initializes all submodules before the build step.

🧰 Tools
🪛 GitHub Actions: Copilot code review

[error] 1-1: autofind command failed. Error getting common flags: diff references file 'store_handler/eloq_data_store_service/eloqstore' that is not present in repo (and not marked deleted) – this often means the repository directory argument is incorrect or not the repo root. Command: autofind run '${REPO_CHECKOUT_PATH}' --extra extra.json --diff-file diff-file.diff ...

🤖 Prompt for AI Agents
In `@store_handler/eloq_data_store_service/eloqstore` at line 1, The repository's
submodule declared in .gitmodules is not initialized in CI causing an empty
directory; update the CI pipeline (e.g., the checkout/build job) to initialize
submodules by running git submodule update --init --recursive (or enable
submodules in your checkout action/step) before the build step so the eloqstore
submodule is populated for the build.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In `@store_handler/eloq_data_store_service/eloq_store_data_store.cpp`:
- Around line 247-249: The branchless mapping `entry.op_ =
static_cast<::eloqstore::WriteOp>(1 -
static_cast<uint8_t>(write_req->KeyOpType(i)));` assumes specific enum ordinals
for the source WriteOpType (DELETE==0, PUT==1) and target ::eloqstore::WriteOp
(Upsert==0, Delete==1); add defensive static assertions immediately above this
assignment to lock those assumptions at compile time (e.g.
static_assert(static_cast<uint8_t>(WriteOpType::DELETE)==0 &&
static_cast<uint8_t>(WriteOpType::PUT)==1, "Unexpected WriteOpType ordinals");
and similarly for ::eloqstore::WriteOp
(static_assert(static_cast<uint8_t>(::eloqstore::WriteOp::Upsert)==0 &&
static_cast<uint8_t>(::eloqstore::WriteOp::Delete)==1, "Unexpected
eloqstore::WriteOp ordinals")); this ensures any future reordering of
WriteOpType or ::eloqstore::WriteOp will fail to compile rather than silently
corrupt mappings for entry.op_ and write_req->KeyOpType(i).

Comment on lines +247 to +249
// Branchless: WriteOpType::PUT(1)->Upsert(0), DELETE(0)->Delete(1)
entry.op_ = static_cast<::eloqstore::WriteOp>(
1 - static_cast<uint8_t>(write_req->KeyOpType(i)));
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Search for WriteOpType and eloqstore::WriteOp enum definitions
echo "=== Searching for WriteOpType enum ==="
rg -n "enum.*WriteOpType" -A 10 -B 2

echo ""
echo "=== Searching for eloqstore WriteOp enum ==="
rg -n "enum.*WriteOp" -A 10 -B 2 --type cpp

echo ""
echo "=== Searching for WriteOp in .proto files ==="
fd "\.proto$" -x cat -n {}

Repository: eloqdata/tx_service

Length of output: 50375


🏁 Script executed:

#!/bin/bash
# Search for eloqstore WriteOp enum definition more specifically
echo "=== Searching for eloqstore::WriteOp enum ==="
rg "namespace eloqstore" -A 50 | rg -A 5 "enum.*WriteOp|WriteOp\s*{"

echo ""
echo "=== Searching in .proto files for eloqstore package ==="
rg "package eloqstore" -A 100 | head -150

echo ""
echo "=== Direct search for WriteOp enum ==="
rg "enum.*WriteOp|WriteOp\s*\{" -A 5 --type cpp --type proto

Repository: eloqdata/tx_service

Length of output: 366


🏁 Script executed:

#!/bin/bash
# Search for eloqstore WriteOp enum with proper regex escaping
echo "=== Searching for WriteOp enum in all files ==="
rg "enum.*WriteOp" -A 5

echo ""
echo "=== Searching for Upsert and Delete in context ==="
rg "Upsert|Delete" | grep -i "writeop\|enum" -B 2 -A 2

echo ""
echo "=== Searching in .proto files ==="
fd "\.proto$" -x rg "WriteOp\|Upsert\|Delete" -l {}

Repository: eloqdata/tx_service

Length of output: 3713


🏁 Script executed:

#!/bin/bash
# Search for eloqstore namespace and WriteOp
echo "=== Searching for eloqstore namespace ==="
rg "namespace eloqstore" -A 100 | head -200

echo ""
echo "=== Searching for WriteOp in include directories ==="
fd -t f "\.h$|\.hpp$" -x rg -l "eloqstore.*WriteOp|WriteOp.*Upsert|WriteOp.*Delete" {}

echo ""
echo "=== Searching for eloqstore in directory structure ==="
find . -type d -name "*eloq*" 2>/dev/null | head -20

Repository: eloqdata/tx_service

Length of output: 430


🏁 Script executed:

#!/bin/bash
# List files in eloqstore directory
echo "=== Files in eloqstore directory ==="
find ./store_handler/eloq_data_store_service/eloqstore -type f -name "*.h" -o -name "*.hpp" -o -name "*.proto"

echo ""
echo "=== Search for WriteOp in eloqstore directory ==="
rg "WriteOp" ./store_handler/eloq_data_store_service/eloqstore -A 10 -B 2

Repository: eloqdata/tx_service

Length of output: 150


🏁 Script executed:

#!/bin/bash
# Broad search for WriteOp enum definition
echo "=== Search for 'enum.*WriteOp' in all C++ and header files ==="
rg "enum" -t cpp -t cc -A 3 | rg -B 2 -A 3 "WriteOp"

echo ""
echo "=== Check the actual usage context in eloq_store_data_store.cpp ==="
rg -B 5 -A 5 "entry.op_.*WriteOp\|WriteOp::Upsert\|WriteOp::Delete" ./store_handler/eloq_data_store_service/eloq_store_data_store.cpp

echo ""
echo "=== Search for proto files that might define WriteOp ==="
fd -e proto -x cat {} \; | rg -B 2 -A 2 "WriteOp\|message.*Write"

Repository: eloqdata/tx_service

Length of output: 296


🏁 Script executed:

#!/bin/bash
# Read the actual file around the problematic code
echo "=== Context around line 247 in eloq_store_data_store.cpp ==="
cat -n ./store_handler/eloq_data_store_service/eloq_store_data_store.cpp | sed -n '230,270p'

echo ""
echo "=== Search for all WriteOp:: usages ==="
rg "WriteOp::" ./store_handler/eloq_data_store_service/ -B 1 -A 1

echo ""
echo "=== Check includes at top of eloq_store_data_store.cpp ==="
head -50 ./store_handler/eloq_data_store_service/eloq_store_data_store.cpp | grep -E "#include|namespace"

Repository: eloqdata/tx_service

Length of output: 2911


Branchless enum conversion relies on undocumented ordinal assumptions—add static asserts to prevent silent data corruption.

The conversion 1 - static_cast<uint8_t>(write_req->KeyOpType(i)) assumes WriteOpType::DELETE == 0, PUT == 1 and implicitly that eloqstore::WriteOp::Upsert == 0, Delete == 1. While the current enum values match these assumptions (confirmed in data_store_service.h), future enum reorderings would cause silent mapping failures (deletes become upserts or vice versa).

Add static assertions to guard this assumption:

Suggested defensive approach
+        // Ensure enum ordinals match expected values for branchless conversion
+        static_assert(static_cast<uint8_t>(WriteOpType::DELETE) == 0);
+        static_assert(static_cast<uint8_t>(WriteOpType::PUT) == 1);
+        static_assert(static_cast<int>(::eloqstore::WriteOp::Upsert) == 0);
+        static_assert(static_cast<int>(::eloqstore::WriteOp::Delete) == 1);
         // Branchless: WriteOpType::PUT(1)->Upsert(0), DELETE(0)->Delete(1)
         entry.op_ = static_cast<::eloqstore::WriteOp>(
             1 - static_cast<uint8_t>(write_req->KeyOpType(i)));
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// Branchless: WriteOpType::PUT(1)->Upsert(0), DELETE(0)->Delete(1)
entry.op_ = static_cast<::eloqstore::WriteOp>(
1 - static_cast<uint8_t>(write_req->KeyOpType(i)));
// Ensure enum ordinals match expected values for branchless conversion
static_assert(static_cast<uint8_t>(WriteOpType::DELETE) == 0);
static_assert(static_cast<uint8_t>(WriteOpType::PUT) == 1);
static_assert(static_cast<int>(::eloqstore::WriteOp::Upsert) == 0);
static_assert(static_cast<int>(::eloqstore::WriteOp::Delete) == 1);
// Branchless: WriteOpType::PUT(1)->Upsert(0), DELETE(0)->Delete(1)
entry.op_ = static_cast<::eloqstore::WriteOp>(
1 - static_cast<uint8_t>(write_req->KeyOpType(i)));
🤖 Prompt for AI Agents
In `@store_handler/eloq_data_store_service/eloq_store_data_store.cpp` around lines
247 - 249, The branchless mapping `entry.op_ =
static_cast<::eloqstore::WriteOp>(1 -
static_cast<uint8_t>(write_req->KeyOpType(i)));` assumes specific enum ordinals
for the source WriteOpType (DELETE==0, PUT==1) and target ::eloqstore::WriteOp
(Upsert==0, Delete==1); add defensive static assertions immediately above this
assignment to lock those assumptions at compile time (e.g.
static_assert(static_cast<uint8_t>(WriteOpType::DELETE)==0 &&
static_cast<uint8_t>(WriteOpType::PUT)==1, "Unexpected WriteOpType ordinals");
and similarly for ::eloqstore::WriteOp
(static_assert(static_cast<uint8_t>(::eloqstore::WriteOp::Upsert)==0 &&
static_cast<uint8_t>(::eloqstore::WriteOp::Delete)==1, "Unexpected
eloqstore::WriteOp ordinals")); this ensures any future reordering of
WriteOpType or ::eloqstore::WriteOp will fail to compile rather than silently
corrupt mappings for entry.op_ and write_req->KeyOpType(i).

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Fix all issues with AI agents
In `@store_handler/eloq_data_store_service/internal_request.h`:
- Around line 174-197: KeyOpTypeBatch currently assumes proto enum ordinals
match WriteOpType; change it to mirror the explicit mapping used by KeyOpType()
(use a switch mapping proto op_type() -> WriteOpType values and handle unknown
values with a default/ASSERT) and add defensive bounds and output-pointer checks
in KeyOpTypeBatch, GetRecordTsBatch, and GetRecordTtlBatch: verify start_idx +
count <= req_->items_size() and that out is non-null (or return/ASSERT on
failure) before iterating to avoid out-of-range reads. Ensure you reference the
existing req_->items(...) access and WriteOpType when applying these changes.
- Around line 337-360: Add defensive bounds and null checks to the batch memcpy
methods: in GetRecordTsBatch, GetRecordTtlBatch and KeyOpTypeBatch verify out is
non-null and that start_idx + count does not exceed the backing container size
(use ts_->size(), ttl_->size(), op_types_->size()) before calling memcpy; if
count == 0 return early as today, otherwise assert or throw on invalid ranges so
callers cannot trigger out-of-bounds reads. Ensure the checks are applied
consistently in all three functions and keep the existing memcpy behavior when
the inputs are valid.

Comment on lines +174 to +197
void GetRecordTsBatch(size_t start_idx, size_t count, uint64_t *out) const override
{
for (size_t i = 0; i < count; ++i)
{
out[i] = req_->items(static_cast<int>(start_idx + i)).ts();
}
}

void GetRecordTtlBatch(size_t start_idx, size_t count, uint64_t *out) const override
{
for (size_t i = 0; i < count; ++i)
{
out[i] = req_->items(static_cast<int>(start_idx + i)).ttl();
}
}

void KeyOpTypeBatch(size_t start_idx, size_t count, WriteOpType *out) const override
{
for (size_t i = 0; i < count; ++i)
{
out[i] = static_cast<WriteOpType>(
req_->items(static_cast<int>(start_idx + i)).op_type());
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Fix KeyOpTypeBatch mapping and add bounds/out checks.
KeyOpType() uses explicit mapping, but KeyOpTypeBatch() relies on ordinal equivalence, which can silently flip DELETE/PUT if enums ever diverge. Also, the batch getters can read out-of-range with no guard.

🛠️ Proposed fix (explicit mapping + defensive asserts)
 void GetRecordTsBatch(size_t start_idx, size_t count, uint64_t *out) const override
 {
+    assert(out != nullptr);
+    assert(start_idx + count <= static_cast<size_t>(req_->items_size()));
     for (size_t i = 0; i < count; ++i)
     {
         out[i] = req_->items(static_cast<int>(start_idx + i)).ts();
     }
 }

 void GetRecordTtlBatch(size_t start_idx, size_t count, uint64_t *out) const override
 {
+    assert(out != nullptr);
+    assert(start_idx + count <= static_cast<size_t>(req_->items_size()));
     for (size_t i = 0; i < count; ++i)
     {
         out[i] = req_->items(static_cast<int>(start_idx + i)).ttl();
     }
 }

 void KeyOpTypeBatch(size_t start_idx, size_t count, WriteOpType *out) const override
 {
+    assert(out != nullptr);
+    assert(start_idx + count <= static_cast<size_t>(req_->items_size()));
     for (size_t i = 0; i < count; ++i)
     {
-        out[i] = static_cast<WriteOpType>(
-            req_->items(static_cast<int>(start_idx + i)).op_type());
+        const auto op =
+            req_->items(static_cast<int>(start_idx + i)).op_type();
+        if (op == remote::WriteOpType::Delete)
+        {
+            out[i] = WriteOpType::DELETE;
+        }
+        else
+        {
+            assert(op == remote::WriteOpType::Put);
+            out[i] = WriteOpType::PUT;
+        }
     }
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
void GetRecordTsBatch(size_t start_idx, size_t count, uint64_t *out) const override
{
for (size_t i = 0; i < count; ++i)
{
out[i] = req_->items(static_cast<int>(start_idx + i)).ts();
}
}
void GetRecordTtlBatch(size_t start_idx, size_t count, uint64_t *out) const override
{
for (size_t i = 0; i < count; ++i)
{
out[i] = req_->items(static_cast<int>(start_idx + i)).ttl();
}
}
void KeyOpTypeBatch(size_t start_idx, size_t count, WriteOpType *out) const override
{
for (size_t i = 0; i < count; ++i)
{
out[i] = static_cast<WriteOpType>(
req_->items(static_cast<int>(start_idx + i)).op_type());
}
}
void GetRecordTsBatch(size_t start_idx, size_t count, uint64_t *out) const override
{
assert(out != nullptr);
assert(start_idx + count <= static_cast<size_t>(req_->items_size()));
for (size_t i = 0; i < count; ++i)
{
out[i] = req_->items(static_cast<int>(start_idx + i)).ts();
}
}
void GetRecordTtlBatch(size_t start_idx, size_t count, uint64_t *out) const override
{
assert(out != nullptr);
assert(start_idx + count <= static_cast<size_t>(req_->items_size()));
for (size_t i = 0; i < count; ++i)
{
out[i] = req_->items(static_cast<int>(start_idx + i)).ttl();
}
}
void KeyOpTypeBatch(size_t start_idx, size_t count, WriteOpType *out) const override
{
assert(out != nullptr);
assert(start_idx + count <= static_cast<size_t>(req_->items_size()));
for (size_t i = 0; i < count; ++i)
{
const auto op =
req_->items(static_cast<int>(start_idx + i)).op_type();
if (op == remote::WriteOpType::Delete)
{
out[i] = WriteOpType::DELETE;
}
else
{
assert(op == remote::WriteOpType::Put);
out[i] = WriteOpType::PUT;
}
}
}
🤖 Prompt for AI Agents
In `@store_handler/eloq_data_store_service/internal_request.h` around lines 174 -
197, KeyOpTypeBatch currently assumes proto enum ordinals match WriteOpType;
change it to mirror the explicit mapping used by KeyOpType() (use a switch
mapping proto op_type() -> WriteOpType values and handle unknown values with a
default/ASSERT) and add defensive bounds and output-pointer checks in
KeyOpTypeBatch, GetRecordTsBatch, and GetRecordTtlBatch: verify start_idx +
count <= req_->items_size() and that out is non-null (or return/ASSERT on
failure) before iterating to avoid out-of-range reads. Ensure you reference the
existing req_->items(...) access and WriteOpType when applying these changes.

Comment on lines +337 to +360
void GetRecordTsBatch(size_t start_idx, size_t count, uint64_t *out) const override
{
if (count != 0u)
{
std::memcpy(out, ts_->data() + start_idx, count * sizeof(uint64_t));
}
}

void GetRecordTtlBatch(size_t start_idx, size_t count, uint64_t *out) const override
{
if (count != 0u)
{
std::memcpy(out, ttl_->data() + start_idx, count * sizeof(uint64_t));
}
}

void KeyOpTypeBatch(size_t start_idx, size_t count, WriteOpType *out) const override
{
if (count != 0u)
{
std::memcpy(out, op_types_->data() + start_idx,
count * sizeof(WriteOpType));
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Add bounds/out checks to batch memcpy paths.
These memcpy paths can read past the vector bounds if the caller passes an incorrect range.

🛠️ Proposed fix (defensive asserts)
 void GetRecordTsBatch(size_t start_idx, size_t count, uint64_t *out) const override
 {
-    if (count != 0u)
+    assert(out != nullptr);
+    assert(start_idx + count <= ts_->size());
+    if (count != 0u)
     {
         std::memcpy(out, ts_->data() + start_idx, count * sizeof(uint64_t));
     }
 }

 void GetRecordTtlBatch(size_t start_idx, size_t count, uint64_t *out) const override
 {
-    if (count != 0u)
+    assert(out != nullptr);
+    assert(start_idx + count <= ttl_->size());
+    if (count != 0u)
     {
         std::memcpy(out, ttl_->data() + start_idx, count * sizeof(uint64_t));
     }
 }

 void KeyOpTypeBatch(size_t start_idx, size_t count, WriteOpType *out) const override
 {
-    if (count != 0u)
+    assert(out != nullptr);
+    assert(start_idx + count <= op_types_->size());
+    if (count != 0u)
     {
         std::memcpy(out, op_types_->data() + start_idx,
                    count * sizeof(WriteOpType));
     }
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
void GetRecordTsBatch(size_t start_idx, size_t count, uint64_t *out) const override
{
if (count != 0u)
{
std::memcpy(out, ts_->data() + start_idx, count * sizeof(uint64_t));
}
}
void GetRecordTtlBatch(size_t start_idx, size_t count, uint64_t *out) const override
{
if (count != 0u)
{
std::memcpy(out, ttl_->data() + start_idx, count * sizeof(uint64_t));
}
}
void KeyOpTypeBatch(size_t start_idx, size_t count, WriteOpType *out) const override
{
if (count != 0u)
{
std::memcpy(out, op_types_->data() + start_idx,
count * sizeof(WriteOpType));
}
}
void GetRecordTsBatch(size_t start_idx, size_t count, uint64_t *out) const override
{
assert(out != nullptr);
assert(start_idx + count <= ts_->size());
if (count != 0u)
{
std::memcpy(out, ts_->data() + start_idx, count * sizeof(uint64_t));
}
}
void GetRecordTtlBatch(size_t start_idx, size_t count, uint64_t *out) const override
{
assert(out != nullptr);
assert(start_idx + count <= ttl_->size());
if (count != 0u)
{
std::memcpy(out, ttl_->data() + start_idx, count * sizeof(uint64_t));
}
}
void KeyOpTypeBatch(size_t start_idx, size_t count, WriteOpType *out) const override
{
assert(out != nullptr);
assert(start_idx + count <= op_types_->size());
if (count != 0u)
{
std::memcpy(out, op_types_->data() + start_idx,
count * sizeof(WriteOpType));
}
}
🤖 Prompt for AI Agents
In `@store_handler/eloq_data_store_service/internal_request.h` around lines 337 -
360, Add defensive bounds and null checks to the batch memcpy methods: in
GetRecordTsBatch, GetRecordTtlBatch and KeyOpTypeBatch verify out is non-null
and that start_idx + count does not exceed the backing container size (use
ts_->size(), ttl_->size(), op_types_->size()) before calling memcpy; if count ==
0 return early as today, otherwise assert or throw on invalid ranges so callers
cannot trigger out-of-bounds reads. Ensure the checks are applied consistently
in all three functions and keep the existing memcpy behavior when the inputs are
valid.

@lokax lokax force-pushed the yf-pro-ckpt branch 4 times, most recently from 9e35ff6 to b550035 Compare January 30, 2026 08:02
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In `@tx_service/src/cc/local_cc_shards.cpp`:
- Around line 4865-4878: The current filter_lambda builds a contiguous min/max
range from partition_ids which erroneously includes partitions from other
workers; instead compute masked = (hash_code & 0x3FF) and test membership
against the actual partition_ids slice for this scan (e.g. check masked equals
one of partition_ids[i .. i+partition_number_per_scan-1] or use an inline
set/lookup over that slice) and then combine that membership test with the
existing data_sync_task->filter_lambda_ check; update filter_lambda (and any
helpers) to use the masked value and explicit membership against the local
partition_ids window rather than the min/max bounds.

@lokax lokax force-pushed the yf-pro-ckpt branch 3 times, most recently from 927250a to 29e89ee Compare January 30, 2026 09:33
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
tx_service/src/cc/local_cc_shards.cpp (1)

5889-6023: ⚠️ Potential issue | 🔴 Critical

PutAll is commented out, so flush never writes base data.
succ remains true and the pipeline proceeds, but no data is persisted—this is a silent data-loss path unless explicitly gated. Please restore the call (or guard it behind a flag with a clear comment).

🐛 Proposed fix
-        // succ = store_hd_->PutAll(flush_task_entries);
+        succ = store_hd_->PutAll(flush_task_entries);
🤖 Fix all issues with AI agents
In `@tx_service/src/cc/local_cc_shards.cpp`:
- Around line 5806-5885: The producer methods AddFlushTaskEntry and
FlushCurrentFlushBuffer perform unprotected accesses to cur_flush_buffers_ and
pending_flush_work_ causing a race with FlushDataWorker; acquire
flush_data_worker_ctx_.mux_ with a std::unique_lock<std::mutex> (e.g. worker_lk)
at the top of each function before any access to cur_flush_buffers_,
cur_flush_buffer.AddFlushTaskEntry, or cur_flush_buffer.MoveFlushData, and hold
that lock while you inspect/modify pending_flush_work_ and attempt
MergeFrom/.emplace_back; release the lock only after these buffer/queue
operations complete (or scope the unique_lock accordingly) so all buffer
accesses in AddFlushTaskEntry and FlushCurrentFlushBuffer are protected by
flush_data_worker_ctx_.mux_.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In `@tx_service/src/cc/local_cc_shards.cpp`:
- Around line 282-313: Re-enable the worker-count enforcement so per-worker
buffers cannot be indexed out of bounds: restore a check that compares
flush_data_worker_ctx_.worker_num_ with data_sync_worker_ctx_.worker_num_ (e.g.,
an assert and/or LOG(FATAL) with both counts) before using those values to size
cur_flush_buffers_ and pending_flush_work_; ensure the check runs in the
constructor/initialization path that creates cur_flush_buffers_ and that any
mismatch prevents continuing (this protects DataSyncForHashPartition which
indexes by data-sync worker_idx).

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In `@tx_service/src/cc/local_cc_shards.cpp`:
- Around line 4836-4842: The indexing into cur_flush_buffers_ using worker_idx /
2 is unsafe and can go out of bounds; change it to use the same mapping used
elsewhere (map worker_idx to a flush worker via modulo) and guard against empty
cur_flush_buffers_. Specifically, replace cur_flush_buffers_[worker_idx / 2]
with cur_flush_buffers_[worker_idx % cur_flush_buffers_.size()] (or an
equivalent helper used elsewhere) and add a pre-check that
cur_flush_buffers_.empty() is handled (e.g., set flush_buffer_size to 0 or bail)
so UpdatedMemory()/partition logic remains safe; update uses of
flush_buffer_size accordingly to ensure consistency with the existing
worker->flush-worker mapping logic.

@lokax lokax force-pushed the yf-pro-ckpt branch 7 times, most recently from a5a15e4 to 0ad47f1 Compare February 2, 2026 04:07
Copy link
Collaborator

@yi-xmu yi-xmu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🤖 Fix all issues with AI agents
In `@tx_service/include/cc/local_cc_shards.h`:
- Around line 2455-2461: Ensure flush_data_worker_ctx_.worker_num_ is validated
right after it is initialized in the constructor for the class that defines
DataSyncWorkerToFlushDataWorker; add an assert or runtime check (and
fallback/throw) so worker_num_ >= 1 (e.g., assert(worker_num_ >= 1) or if
(worker_num_ == 0) set to 1 or throw std::runtime_error). This prevents
divide-by-zero in DataSyncWorkerToFlushDataWorker (which uses
flush_data_worker_ctx_.worker_num_) and in StartBackgroundWorkers buffer-size
calculations; locate the constructor that sets
flush_data_worker_ctx_.worker_num_ and add the validation there.

In `@tx_service/src/cc/local_cc_shards.cpp`:
- Around line 282-303: The comment claims worker counts are aligned with
data_sync_worker_ctx_.worker_num_ but the code uses
flush_data_worker_ctx_.worker_num_ without validating equality; add a check
(e.g., an assert or runtime error) that data_sync_worker_ctx_.worker_num_ ==
flush_data_worker_ctx_.worker_num_ before using worker_num (or else change the
comment to reflect that only flush_data_worker_ctx_.worker_num_ is
authoritative). Place the check near where worker_num is set/used (references:
flush_data_worker_ctx_.worker_num_, data_sync_worker_ctx_.worker_num_,
worker_num, cur_flush_buffers_, pending_flush_work_, and the thread-start loop)
so the buffers, pending_flush_work_.resize(...) and thread creation only proceed
when counts match.
- Around line 5860-5893: FlushCurrentFlushBuffer can append to
pending_flush_work_ without respecting the queue cap and backpressure; modify it
to wait using the same condition variable logic as elsewhere before emplacing a
new task: inside the loop in LocalCcShards::FlushCurrentFlushBuffer, before
calling pending_flush_work.emplace_back(std::move(flush_data_task)), wait on
flush_data_worker_ctx_.cv_ (using the held worker_lk) until
pending_flush_work.size() is below the configured cap (the same limit checked
elsewhere) or until notified, then proceed to emplace; use the same
predicate/timeout semantics used by the other producer code paths so forced
flushes honor the backpressure on pending_flush_work_.

Comment on lines +2455 to +2461
/**
* @brief Map a data_sync_worker index to the fixed flush_data_worker index.
* Used when data_sync_worker count != flush_data_worker count so that each
* data_sync_worker consistently targets one flush_data_worker.
*/
size_t DataSyncWorkerToFlushDataWorker(size_t data_sync_worker_id) const;

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# Find the .cpp file and locate the implementation of DataSyncWorkerToFlushDataWorker
fd -e cpp -path "*/tx_service/*" local_cc_shards | head -20

Repository: eloqdata/tx_service

Length of output: 231


🏁 Script executed:

# Search for DataSyncWorkerToFlushDataWorker implementation
rg -n "DataSyncWorkerToFlushDataWorker" --type cpp -A 10

Repository: eloqdata/tx_service

Length of output: 3867


🏁 Script executed:

# Search for flush_data_worker_ctx_ initialization and worker_num_
rg -n "flush_data_worker_ctx_" --type cpp -A 3 -B 1

Repository: eloqdata/tx_service

Length of output: 8710


🏁 Script executed:

# Search for worker_num_ validation/checks
rg -n "worker_num_" --type cpp --type h -A 2 -B 1

Repository: eloqdata/tx_service

Length of output: 9840


Validate flush_data_worker_ctx_.worker_num_ is never zero to prevent divide-by-zero errors.

The modulo operation in DataSyncWorkerToFlushDataWorker (line 5809) will crash if flush_data_worker_ctx_.worker_num_ is 0. In non-EXT_TX_PROC_ENABLED builds, the initialization at line 141 uses std::min(conf.at("core_num"), 10), which could be 0 if core_num is misconfigured. A divide-by-zero would occur even earlier in StartBackgroundWorkers (line 290) when calculating buffer size, but the root issue is the lack of validation at initialization. Add an assert or check to ensure worker_num_ >= 1 immediately after initialization in the constructor initializer list.

🤖 Prompt for AI Agents
In `@tx_service/include/cc/local_cc_shards.h` around lines 2455 - 2461, Ensure
flush_data_worker_ctx_.worker_num_ is validated right after it is initialized in
the constructor for the class that defines DataSyncWorkerToFlushDataWorker; add
an assert or runtime check (and fallback/throw) so worker_num_ >= 1 (e.g.,
assert(worker_num_ >= 1) or if (worker_num_ == 0) set to 1 or throw
std::runtime_error). This prevents divide-by-zero in
DataSyncWorkerToFlushDataWorker (which uses flush_data_worker_ctx_.worker_num_)
and in StartBackgroundWorkers buffer-size calculations; locate the constructor
that sets flush_data_worker_ctx_.worker_num_ and add the validation there.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (3)
tx_service/include/cc/template_cc_map.h (1)

8544-8566: ⚠️ Potential issue | 🟠 Major

Release-build counter underflow is still possible in AdjustDataKeyStats.

Line 8553 and Line 8561 rely on assert for underflow prevention; in release builds this can wrap size_t and corrupt key stats.

🛠️ Suggested runtime-clamp fix
 void AdjustDataKeyStats(int64_t size_delta, int64_t dirty_delta)
 {
     if (table_name_.IsMeta())
     {
         return;
     }

     if (size_delta != 0)
     {
-        assert(size_delta >= 0 ||
-               data_key_count_ >= static_cast<size_t>(-size_delta));
-        data_key_count_ = static_cast<size_t>(
-            static_cast<int64_t>(data_key_count_) + size_delta);
+        const int64_t next_size =
+            static_cast<int64_t>(data_key_count_) + size_delta;
+        if (next_size < 0)
+        {
+            data_key_count_ = 0;
+        }
+        else
+        {
+            data_key_count_ = static_cast<size_t>(next_size);
+        }
     }

     if (dirty_delta != 0)
     {
-        assert(dirty_delta >= 0 ||
-               dirty_data_key_count_ >= static_cast<size_t>(-dirty_delta));
-        dirty_data_key_count_ = static_cast<size_t>(
-            static_cast<int64_t>(dirty_data_key_count_) + dirty_delta);
+        const int64_t next_dirty =
+            static_cast<int64_t>(dirty_data_key_count_) + dirty_delta;
+        if (next_dirty < 0)
+        {
+            dirty_data_key_count_ = 0;
+        }
+        else
+        {
+            dirty_data_key_count_ = static_cast<size_t>(next_dirty);
+        }
     }
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tx_service/include/cc/template_cc_map.h` around lines 8544 - 8566,
AdjustDataKeyStats currently uses assert to prevent underflow which is disabled
in release builds; change the logic in AdjustDataKeyStats (and the dirty-data
branch) to perform runtime checks and clamp instead of asserting: after the
table_name_.IsMeta() early return, for size_delta compute the new
data_key_count_ by if size_delta >= 0 adding it, otherwise subtracting
min(static_cast<size_t>(-size_delta), data_key_count_) so it never wraps; apply
the same safe-clamp pattern to dirty_data_key_count_ for dirty_delta; remove the
reliance on assert so release builds cannot underflow.
tx_service/src/cc/local_cc_shards.cpp (2)

5903-5905: ⚠️ Potential issue | 🟠 Major

Forced flush path still bypasses queue backpressure

FlushCurrentFlushBuffer can enqueue new tasks without applying the same queue-cap wait used in AddFlushTaskEntry, so repeated forced flushes can grow pending_flush_work_ beyond the intended bound.

🧯 Proposed fix
-            // Add as new task
-            pending_flush_work.emplace_back(std::move(flush_data_task));
+            // Add as new task (respect queue backpressure)
+            while (pending_flush_work.size() >= 2)
+            {
+                flush_data_worker_ctx_.cv_.wait(worker_lk);
+            }
+            pending_flush_work.emplace_back(std::move(flush_data_task));
             flush_data_worker_ctx_.cv_.notify_all();
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tx_service/src/cc/local_cc_shards.cpp` around lines 5903 - 5905,
FlushCurrentFlushBuffer enqueues flush tasks directly via
pending_flush_work_.emplace_back(...) and notify_all, bypassing the
queue-backpressure logic used by AddFlushTaskEntry; modify
FlushCurrentFlushBuffer to reuse the same backpressure/wait behavior (or call
AddFlushTaskEntry) before adding the task: before emplacing the flush_data_task,
wait on flush_data_worker_ctx_.cv_ with the same predicate/limit check used in
AddFlushTaskEntry (or invoke AddFlushTaskEntry(flush_data_task)) so
pending_flush_work_ cannot grow beyond the intended bound, then emplace/move and
notify as before.

4874-4889: ⚠️ Potential issue | 🔴 Critical

Critical: partition window filter still includes partitions owned by other workers

The min/max range check over a strided partition_ids slice admits partitions that belong to different workers, causing duplicate/incorrect scans and checkpoint updates.

🐛 Proposed fix
-        size_t min_partition_id_this_scan = partition_ids[i];
-        size_t max_partition_id_this_scan =
-            partition_ids[std::min(i + partition_number_per_scan,
-                                   partition_number_this_core) -
-                          1];
+        const size_t min_partition_idx = i;
+        const size_t max_partition_idx =
+            std::min(i + partition_number_per_scan, partition_number_this_core);
         std::function<bool(size_t)> filter_lambda =
-            [min_partition_id_this_scan,
-             max_partition_id_this_scan,
+            [worker_idx,
+             core_number,
+             min_partition_idx,
+             max_partition_idx,
              &filter_func =
                  data_sync_task->filter_lambda_](const size_t hash_code)
         {
-            return (hash_code % total_hash_partitions) >=
-                       min_partition_id_this_scan &&
-                   (hash_code % total_hash_partitions) <=
-                       max_partition_id_this_scan &&
-                   (!filter_func || filter_func(hash_code));
+            const size_t part_id = hash_code % total_hash_partitions;
+            if (part_id % core_number != worker_idx)
+            {
+                return false;
+            }
+            const size_t local_idx = (part_id - worker_idx) / core_number;
+            return local_idx >= min_partition_idx &&
+                   local_idx < max_partition_idx &&
+                   (!filter_func || filter_func(hash_code));
         };

Based on learnings: shard_code & 0x3FF plus modulo core mapping is intentional for ownership/load balancing, so filter predicates must preserve modulo-based ownership semantics.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tx_service/src/cc/local_cc_shards.cpp` around lines 4874 - 4889, The current
filter_lambda uses a contiguous range check
(min_partition_id_this_scan..max_partition_id_this_scan) which admits partitions
from other workers due to strided partition_ids; replace the range test with an
explicit membership check against the actual partition_ids slice for this scan.
Concretely, build the set/list of allowed partition ids from partition_ids[i ..
i+partition_number_per_scan-1] (or iterate that slice) and change the predicate
inside filter_lambda to compute p = hash_code % total_hash_partitions and return
(p is one of the allowed partition ids) && (!filter_func ||
filter_func(hash_code)); keep using data_sync_task->filter_lambda_ and the hash
modulo semantics (e.g. shard_code & 0x3FF then % mapping) unchanged.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@tx_service/include/cc/cc_request.h`:
- Around line 9105-9113: The UpdatedMemory() method can overflow when computing
dirty_key_count_ * memory_usage_ and can return values > total memory if
dirty_key_count_ temporarily exceeds data_key_count_; update UpdatedMemory() to
first clamp dirty_key_count_ to at most data_key_count_, perform the arithmetic
in a wider unsigned integer type (e.g., uint64_t or size_t_t widened type) to
avoid multiplication overflow, compute the rounded-up division safely, and
finally clamp the result to not exceed memory_usage_ (or the total memory
represented by memory_usage_) before casting back to size_t; reference
UpdatedMemory(), dirty_key_count_, data_key_count_, and memory_usage_ to locate
and change the logic.

---

Duplicate comments:
In `@tx_service/include/cc/template_cc_map.h`:
- Around line 8544-8566: AdjustDataKeyStats currently uses assert to prevent
underflow which is disabled in release builds; change the logic in
AdjustDataKeyStats (and the dirty-data branch) to perform runtime checks and
clamp instead of asserting: after the table_name_.IsMeta() early return, for
size_delta compute the new data_key_count_ by if size_delta >= 0 adding it,
otherwise subtracting min(static_cast<size_t>(-size_delta), data_key_count_) so
it never wraps; apply the same safe-clamp pattern to dirty_data_key_count_ for
dirty_delta; remove the reliance on assert so release builds cannot underflow.

In `@tx_service/src/cc/local_cc_shards.cpp`:
- Around line 5903-5905: FlushCurrentFlushBuffer enqueues flush tasks directly
via pending_flush_work_.emplace_back(...) and notify_all, bypassing the
queue-backpressure logic used by AddFlushTaskEntry; modify
FlushCurrentFlushBuffer to reuse the same backpressure/wait behavior (or call
AddFlushTaskEntry) before adding the task: before emplacing the flush_data_task,
wait on flush_data_worker_ctx_.cv_ with the same predicate/limit check used in
AddFlushTaskEntry (or invoke AddFlushTaskEntry(flush_data_task)) so
pending_flush_work_ cannot grow beyond the intended bound, then emplace/move and
notify as before.
- Around line 4874-4889: The current filter_lambda uses a contiguous range check
(min_partition_id_this_scan..max_partition_id_this_scan) which admits partitions
from other workers due to strided partition_ids; replace the range test with an
explicit membership check against the actual partition_ids slice for this scan.
Concretely, build the set/list of allowed partition ids from partition_ids[i ..
i+partition_number_per_scan-1] (or iterate that slice) and change the predicate
inside filter_lambda to compute p = hash_code % total_hash_partitions and return
(p is one of the allowed partition ids) && (!filter_func ||
filter_func(hash_code)); keep using data_sync_task->filter_lambda_ and the hash
modulo semantics (e.g. shard_code & 0x3FF then % mapping) unchanged.

ℹ️ Review info

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 1a32187 and b4e33a2.

📒 Files selected for processing (6)
  • store_handler/data_store_service_client.cpp
  • store_handler/eloq_data_store_service/eloq_store_data_store.cpp
  • tx_service/include/cc/cc_request.h
  • tx_service/include/cc/local_cc_shards.h
  • tx_service/include/cc/template_cc_map.h
  • tx_service/src/cc/local_cc_shards.cpp
🚧 Files skipped from review as they are similar to previous changes (1)
  • store_handler/data_store_service_client.cpp

Comment on lines 9105 to +9113
size_t UpdatedMemory() const
{
const uint64_t scanned = scanned_key_count_;
if (scanned == 0)
if (data_key_count_ == 0)
return 0;
// Use cc map's data_key_count_ and dirty_key_count_ for estimation.
// integer math with rounding up to avoid systematic underestimation
return static_cast<size_t>(
(updated_key_count_ * memory_usage_ + scanned - 1) / scanned);
(dirty_key_count_ * memory_usage_ + data_key_count_ - 1) /
data_key_count_);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Guard UpdatedMemory() against counter drift and arithmetic overflow.

Line 9112 can overflow in dirty_key_count_ * memory_usage_, and if dirty_key_count_ temporarily exceeds data_key_count_, the estimate can exceed total memory and distort flush decisions. Clamp dirty count to total count and compute with overflow-safe arithmetic.

🔧 Suggested fix
 size_t UpdatedMemory() const
 {
     if (data_key_count_ == 0)
         return 0;
-    // Use cc map's data_key_count_ and dirty_key_count_ for estimation.
-    // integer math with rounding up to avoid systematic underestimation
-    return static_cast<size_t>(
-        (dirty_key_count_ * memory_usage_ + data_key_count_ - 1) /
-        data_key_count_);
+    // Use cc map's data_key_count_ and dirty_key_count_ for estimation.
+    // Clamp to avoid over-estimation when counters are temporarily inconsistent.
+    const size_t effective_dirty_key_count =
+        std::min(dirty_key_count_, data_key_count_);
+    // Integer math with rounding up; split multiply/divide to reduce overflow risk.
+    const uint64_t base = memory_usage_ / data_key_count_;
+    const uint64_t rem = memory_usage_ % data_key_count_;
+    return static_cast<size_t>(
+        base * effective_dirty_key_count +
+        (rem * effective_dirty_key_count + data_key_count_ - 1) / data_key_count_);
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tx_service/include/cc/cc_request.h` around lines 9105 - 9113, The
UpdatedMemory() method can overflow when computing dirty_key_count_ *
memory_usage_ and can return values > total memory if dirty_key_count_
temporarily exceeds data_key_count_; update UpdatedMemory() to first clamp
dirty_key_count_ to at most data_key_count_, perform the arithmetic in a wider
unsigned integer type (e.g., uint64_t or size_t_t widened type) to avoid
multiplication overflow, compute the rounded-up division safely, and finally
clamp the result to not exceed memory_usage_ (or the total memory represented by
memory_usage_) before casting back to size_t; reference UpdatedMemory(),
dirty_key_count_, data_key_count_, and memory_usage_ to locate and change the
logic.

@lokax lokax force-pushed the yf-pro-ckpt branch 4 times, most recently from b9ebe1e to 6505b42 Compare March 6, 2026 07:37
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants